Skip to content

feat(connect): add phase 3 metadata reporting#2081

Open
sayedbilalbari wants to merge 20 commits intoNVIDIA:devfrom
sayedbilalbari:sbari-issue-2065
Open

feat(connect): add phase 3 metadata reporting#2081
sayedbilalbari wants to merge 20 commits intoNVIDIA:devfrom
sayedbilalbari:sbari-issue-2065

Conversation

@sayedbilalbari
Copy link
Copy Markdown
Collaborator

@sayedbilalbari sayedbilalbari commented Apr 22, 2026

Fixes #2065

Summary

Adds Spark Connect reporting to profiler and qualification. Correlates Connect operations with SQL executions and Spark jobs, persists session/operation metadata and statement payloads, and exposes them through the Python API. No-ops for event logs without Connect activity.

What's emitted

Per application, under <perAppDir>/ (profiler) and <rootDir>/raw_metrics/<appId>/ (qualification):

File Format Contents
connect_sessions.csv CSV One row per session: id, user, lifecycle timestamps, operation count
connect_operations.csv CSV One row per operation: lifecycle timestamps, derived durations, status, correlated sqlIds/jobIds, sidecar statementFile
connect_statements/<operationId>.txt Directory Protobuf-text statementText sidecar per operation (skipped when empty)

Python API

ResultHandler gains get_connect_statements_dir, list_connect_statement_ops, and load_connect_statement, plus generic get_table_path / get_per_app_table_path accessors. connectReport.yaml registers the three tables; ReportTableFormat.DIRECTORY covers the sidecar directory.

Implementation notes

  • AppBase carries Connect state (connectSessions, connectOperations, operationId → sqlIds/jobIds, jobTag → operationId); isConnectMode is set when either sessions or operations are present, so session-only logs still emit output.
  • Reverse indexes are populated from SQLExecutionStart.jobTags and JobStart.properties["spark.job.tags"].
  • ConnectStatementWriter routes through ToolTextFileWriter for UTF-8 and permission parity with other tool outputs, sanitizes operation IDs, and enforces path-traversal containment.

Testing

  • mvn verify (Connect suites: ConnectCorrelationSuite, ConnectProfileResultsSuite, ConnectProfilerOutputSuite, ConnectStatementWriterSuite, QualificationConnectOutputSuite)
  • tox -e prepare,pylint,flake8
  • pytest user_tools/tests/spark_rapids_tools_ut/api/test_connect_*.py

sayedbilalbari and others added 12 commits April 21, 2026 13:41
Prep for Phase 3 reporting — indexes get populated from SQLExecutionStart.jobTags
and JobStart.spark.job.tags. Issue NVIDIA#2065.
…bTags

Uses reflective accessor to stay compatible with Spark 3.2-3.4 profiles.
Also tightens operationIdTo{Sql,Job}Ids value type to mutable.HashSet for
consistency with neighboring collections on AppBase. Issue NVIDIA#2065.
Row types for connect_sessions.csv and connect_operations.csv with derived phase
durations, status, sqlID/jobID joins, and statement-file metadata.
Issue NVIDIA#2065.
…rom profiler

Per-application. Absent (not empty) file when the app is not in Connect mode --
matches the behavior of every other per-app table. Issue NVIDIA#2065.
Keeps large protobuf-text plans out of connect_operations.csv. Files land under
connect_statements/<operationId>.txt, basename referenced in the statementFile
column of connect_operations.csv. Directory is not created when there are no
operations with non-empty statementText. Issue NVIDIA#2065.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…ation

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
…iles

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
@github-actions github-actions Bot added user_tools Scope the wrapper module running CSP, QualX, and reports (python) core_tools Scope the core module (scala) labels Apr 22, 2026
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Apr 22, 2026

Greptile Summary

This PR adds Spark Connect phase 3 metadata reporting to both the profiler and qualification tools, correlating Connect sessions/operations with SQL executions and Spark jobs, persisting lifecycle metadata as per-app CSVs, and optionally writing statement payloads as sidecar .txt files. Previously identified concerns — O(n×m) session-op count scan, local-filesystem-only sidecar writes, missing empty-token filter on spark.job.tags, and load_connect_statement path traversal — are all addressed in this diff.

Confidence Score: 5/5

Safe to merge; no blocking issues found after full review of all changed files.

All previously flagged P1 concerns (Hadoop filesystem bypass, path traversal in Python reader, O(n×m) operation count, missing empty-token filter) are addressed in the current diff. No new P1 or P0 issues were identified. The implementation is consistent with existing patterns, column counts match between Scala headers and YAML catalogs, and the test suite covers correctness, sanitization, and golden roundtrips.

No files require special attention.

Important Files Changed

Filename Overview
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectStatementWriter.scala New file. Writes per-operation statementText sidecars through ToolTextFileWriter (Hadoop-aware). Sanitizes operationIds and adds defense-in-depth path containment check via require. Clean implementation.
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfileResults.scala New file. Defines ConnectSessionProfileResult and ConnectOperationProfileResult case classes with correct column counts matching OutHeaderRegistry and connectReport.yaml. Status derivation and duration logic are well-documented.
core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala Adds SQL and job correlation logic gated on isConnectMode. Empty tag filtering added to spark.job.tags split, reflection call wrapped in Try for older Spark compat.
user_tools/src/spark_rapids_tools/api_v1/result_handler.py Adds get_connect_statements_dir, list_connect_statement_ops, load_connect_statement, get_table_path, and get_per_app_table_path. Sanitizes operation_id before building sub-path, addressing path-traversal concern.
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala Adds writeConnectTables companion object method; groups operation count by session once (O(n) instead of O(n×m)); correct no-op guard via isConnectMode.
core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala Adds readJobTagsFromSQLStartEvent using cached reflection wrapped in Try; gracefully degrades on pre-3.5 Spark where jobTags field does not exist.
core/src/main/resources/configs/reports/connectReport.yaml New YAML report catalog. Column definitions match Scala headers in OutHeaderRegistry. connectStatements uses new DIRECTORY fileFormat with empty conversion list.
core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala Adds operationIdToSqlIds and operationIdToJobIds reverse-index maps. isConnectMode broadened to include session-only logs.
core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala Wires writeConnectTables into qualification raw metrics generation with optional hadoopConf passthrough. Change is minimal and consistent with existing pattern.
user_tools/src/spark_rapids_tools/enums.py Adds DIRECTORY to ReportTableFormat enum with empty conversion list, preventing accidental format coercion for directory artifacts.

Sequence Diagram

sequenceDiagram
    participant EL as Event Log
    participant EPB as EventProcessorBase
    participant AB as AppBase
    participant CEH as ConnectEventHandler
    participant P as Profiler/QualRawReportGenerator
    participant CSW as ConnectStatementWriter
    participant FS as Hadoop FileSystem

    EL->>CEH: ConnectOperation/Session events
    CEH->>AB: connectSessions/connectOperations.put(...)
    CEH->>AB: jobTagToConnectOpId.put(jobTag → opId)

    EL->>EPB: SparkListenerSQLExecutionStart
    EPB->>AB: isConnectMode?
    AB-->>EPB: true
    EPB->>EPB: readJobTagsFromSQLStartEvent (reflection, Spark 3.5+)
    EPB->>AB: operationIdToSqlIds.getOrElseUpdate(opId).add(sqlId)

    EL->>EPB: SparkListenerJobStart
    EPB->>AB: isConnectMode?
    AB-->>EPB: true
    EPB->>AB: operationIdToJobIds.getOrElseUpdate(opId).add(jobId)

    P->>P: writeConnectTables(writer, app, writeStatements, hadoopConf)
    P->>P: groupBy sessionId → opCountBySession
    P->>P: build ConnectSessionProfileResult rows
    P->>P: build ConnectOperationProfileResult rows
    alt writeStatementSidecars = true
        P->>CSW: writeStatementFiles(rootDir, ops, hadoopConf)
        CSW->>CSW: sanitize operationId → basename
        CSW->>CSW: require target.getParent == subDirPath
        CSW->>FS: ToolTextFileWriter.write(text)
        CSW-->>P: Map[operationId → basename]
    end
    P->>FS: writeCSVTable(Connect Sessions, sessionRows)
    P->>FS: writeCSVTable(Connect Operations, opRows)
Loading

Reviews (8): Last reviewed commit: "fix: add Spark Connect runtime dependenc..." | Re-trigger Greptile

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
@sayedbilalbari sayedbilalbari self-assigned this Apr 22, 2026
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
- ConnectStatementWriter: use Hadoop FileSystem instead of java.nio.file
  so sidecar writes work for HDFS/S3/GCS, not just local paths
- Profiler.writeConnectTables: precompute opCountBySession map to avoid
  O(sessions * operations) scan when emitting connect_sessions.csv
- EventProcessorBase: filter empty tokens when splitting spark.job.tags
- Thread hadoopConf from Qualification/Profiler through
  QualRawReportGenerator to the sidecar writer
@sayedbilalbari
Copy link
Copy Markdown
Collaborator Author

@greptile review

- ConnectStatementWriter: route writes through ToolTextFileWriter and
  ensure trailing newline, resolving both scalastyle errors flagged by
  pre-merge CI.
- Fix scaladoc link warnings by using fully qualified names for
  ConnectSessionInfo / ConnectOperationInfo / OutHeaderRegistry and
  drop the @throws doc tag on EventUtils (annotation is retained).
- Correct stale sidecar path in ConnectProfileResults doc.
- Rewrite Connect test suite headers to remove agent-voice /
  incremental-step framing.
- Tighten list_connect_statement_ops docstring in result_handler.
- Expose get_table_path / get_per_app_table_path on APIResHandler and
  cover them with test_connect_helpers tests for wrapper/core handlers.
@sayedbilalbari
Copy link
Copy Markdown
Collaborator Author

@greptileai review

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
- label: connectStatements
description: >-
Directory of per-operation statementText sidecars. Each
<operationId>.txt file contains the protobuf debug-format text of the
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any industry standard format file extension for storing TextProto files?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very valid point. The canonical protobuf TextFormat is .txtpb which means this is a valid proto output. But in our case the statementText is a diagnostic file that can be truncated. So better to have .txt file. Does not imply a complete protobug TextFormat file

@parthosa
Copy link
Copy Markdown
Collaborator

parthosa commented May 1, 2026

LGTME mostly. I had a concern about the version of Spark JAR used by Tools. Incase Spark Connect JARs are not present, how do we handle the processing?

@sayedbilalbari
Copy link
Copy Markdown
Collaborator Author

sayedbilalbari commented May 1, 2026

LGTME mostly. I had a concern about the version of Spark JAR used by Tools. Incase Spark Connect JARs are not present, how do we handle the processing?

@parthosa Very valid point, thanks for pointing out. Currently tools includes the spark-3.5.7-bin-hadoop3.tgz as default which does not include the connect jars.
Connect distribution has been to be added as a jar dependency separately through maven path.
Updated the user_tools runtime dependency to include the connect jar download step and adding it to classpath.

FYI - even in case of missing jar, the current behavior pops a ClassNotFound error and move onto the next event

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
@sayedbilalbari sayedbilalbari requested a review from parthosa May 1, 2026 23:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core_tools Scope the core module (scala) user_tools Scope the wrapper module running CSP, QualX, and reports (python)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Extract and report Spark Connect session and operation metadata

3 participants